1.1.1. 执行模式(流/批)
Datastream API 支持流/批两种运行模式。最常见的就是实时数据流处理,即 STREAMING mode。除此之外,Flink 在 Datastream API 上也实现了 BATCH mode,类似 MR,Spark 的批处理,这适用于有界数据流处理,常用于状态初始化。
在 Datastream API 上统一流/批处理意味着对于相同的有界输入,不管使用何种模式,程序返回的最终结果是一致的,只不过可能底层执行过程有所不同。
使用 BATCH mode 时,Flink 对于有界数据流的处理会采用不同的执行策略(比如 join ),相比流处理,批处理的调度和任务恢复会更简单高效,这里可以联想下 Spark 的批处理。
1.1.2. 何时使用 Datastream 的批执行模式
BATCH mode 只能用于有界数据流处理。而 STREAMING mode 可以处理有界和无界数据流。通常,我们使用批模式来处理有界数据,因为这种方式更加高效。
有些时候,我们需要初始化无界数据流的状态。此时,可以先采用 STREAMING mode 处理有界数据,生成 savepoint。在通过 savepoint 启动无界数据流任务,从而完成任务状态的衔接。
1.1.3. 配置批执行模式
执行模式可以通过 execution.runtime-mode 设置进行配置,存在以下三种取值:
- STREAMING:Datastream 默认模式
- BATCH - Datastream 批模式
- AUTOMATIC - 根据数据源的有界性自动判断使用何种运行模式。
也可以通过 bin/flink run ... 的命令行参数进行配置,或者在创建 StreamExecutionEnvironment 以编程方式进行配置。
命令行提交任务时配置
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
在 env 中配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
建议在命令行中配置,因为这样程序拓展性更好,避免了同一套逻辑在流/批之间的切换。
1.1.4. 流/批下的 Execution Behavior
本小节主要讲下 Datastream API 在流/批模式下的一些不同。
任务调度和 shuffle
Flink 程序包含多个 operator,这些 operator 从前往后按照某种模式连接起来就构成了数据处理的 pipeline。那么如何在多个进程乃至机器之间调度这些 operator?各个 operator 间又是怎样进行数据交换的呢?
在执行过程中,许多 operator 可以 chain 在一起,旨在降低资源的过多使用,提高性能。一个 operator 或者多个 chain 在一起的 operator组成 Flink 程序的最小调度单元。
任务调度和网络数据交换在流/批模式下是不同的,对于有界数据流的处理,可以使用更多的数据结构和策略,因而相比 STREAMING mode 处理有界数据流更高效。下面会使用一个例子说明这些不同点。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements(...);
source.name("source")
.map(...).name("map1")
.map(...).name("map2")
.rebalance()
.map(...).name("map3")
.map(...).name("map4")
.keyBy((value) -> value)
.map(...).name("map5")
.map(...).name("map6")
.sinkTo(...).name("sink");
对于一对一连接的 operator,像 map()、filter() 这些可以直接将数据发往下游 operator,因此这些 operator 可以 chain 在一起。这也意味着这些 operator 间不会产生 shuffle。而像 keyBy()、reblance() 这些 operator,往往需要将数据发往不到不同的节点(这取决于任务的本地化级别),从而会产生网络数据交换。
上面的示例会生成三个 task
- Task1: source、map1、map2
- Task2: map3、 map4
- Task3: map5、map6、sink
在 task1 和 taks2,task2 和 task3 之间会产生网络 shuffle。下面是该 job 逻辑图的简单描述
流模式
在流模式下,所有的任务都要一直运行,这使得 Flink 程序一接到数据就可以流经整条 pipeline 快速处理。这也意味着 TaskManager 需要分配足够的资源保证同时启动所有的 task 并可以正常运行。 数据交换也是基于 pipeline 的,数据会被立刻发往下游,只不过在网络侧会存在缓冲区,旨在提高吞吐量。在流式处理中,shuffle 的中间结果是不会落盘的。
批模式
在批模式下,任务会被划分成几个阶段,从前往后逐一实行(当然,毫不相关的前置任务也是可以并行执行的)。我们能做到这些是因为数据流是有界的,我们可以处理完一个阶段的所有数据后在接着处理。像上面的例子会被划分成三个 stage,这里 stage 的划分和 Spark 是一样的,碰到 shuffle dependency 就断开 pipeline。
不同于流模式下直接发送数据,批模式会先 shuffle write 将数据落盘,下一阶段在进行 shuffle read 读取上游落盘后的数据。这虽然会增大处理延迟,但是对于批处理是有益的。
- 任务失败时,不必重启所有的 task,而是从上次缓存的中间结果处开始。
- 更少的资源使用,不必一开始就启动所有的 task。
TaskManager 会保留中间结果,直到下游 task 开始读取。并且 TM 会在空间允许的情况下一直保留该中间结果,方便某个阶段任务失败重启。
状态管理
在流模式下,Flink 通过状态后端来控制状态的存储和 checkpoint 的生成。
在批模式下,配置的状态后端被忽略。数据流可以按照 key 被顺序处理,因此同一时刻只有一个 key 的状态,在处理下一个 key 时,上一个 key 的状态会被清理掉。
处理顺序
在流模式下,不考虑数据的顺序,在数据到达后就会根据程序逻辑尽可能快的处理。
在批模式下,有些 operator 的操作是可以保证顺序的,这取决于系统执行某些操作时的策略,也可能是 shuffle,任务调度的副作用。
一般分为三种输入,批模式下会按照如下顺序处理
- broadcast input: input from a broadcast stream。
- regular input: input that is neither broadcast nor keyed。
- keyed input: input from a KeyedStream。
事件时间/ watermark
Flink 使用 watermark 机制来处理无序数据流。在批模式下,数据流是有界的,我们可以确定某些时间前的数据都已经到达了,因此可以直接指定一个最大的 watermark 值,计算会在最后被触发,注册的 WatermarkAssigners 和 WatermarkGenerators 会被忽略。
处理时间
处理时间是程序处理数据那一刻的机器时间,基于此机制的计算是无法保证幂等的,因此同样的数据在两次处理过程中的处理时间可能是不同的。
尽管如此,在流模式下使用处理时间仍然是有些场景的。在数据延迟乱序可控的情况下,处理时间的一小时相当于事件时间的一小时,使用处理时间计算会更快的被触发。
在批模式下,和事件时间的处理是一样的,在最后的输入后触发整体的计算。
任务恢复
在流模式下,Flink 使用 checkpoint 实现容错管理,任务失败后,会从 checkpoint 恢复并重新启动调度所有的 task 。
在批模式下,任务失败后,Flink 会尝试回溯到上一个阶段重新开始,默认只有失败的上游 task 会被重启,从而减少任务恢复时间。
1.1.5. 思考
1. Datastream 在批模式下和 Dataset 有啥区别呢?